Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27190][SQL] add table capability for streaming #24129

Closed
wants to merge 6 commits into from

Conversation

cloud-fan
Copy link
Contributor

@cloud-fan cloud-fan commented Mar 18, 2019

What changes were proposed in this pull request?

This is a followup of #24012 , to add the corresponding capabilities for streaming.

How was this patch tested?

existing tests

@cloud-fan cloud-fan changed the title [SPARK-27190][] add table capability for streaming [SPARK-27190][SQL] add table capability for streaming Mar 18, 2019
@cloud-fan
Copy link
Contributor Author

cc @rdblue @jose-torres

@rdblue
Copy link
Contributor

rdblue commented Mar 18, 2019

@cloud-fan, I think this should update V2WriteSupportCheck to validate that tables used by streaming relations support the read and write capabilities. That is to keep the analyzer up to date with the v2 plans so that when there are other ways to create the streaming plans (like SQL) we don't have lurking bugs.

Copy link
Contributor

@jose-torres jose-torres left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly looks good.

I'm not familiar with V2WriteSupportCheck, but if it's a reasonably small change I agree that would make sense to update.

@@ -353,13 +354,15 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
}

class KafkaTable(strategy: => ConsumerStrategy) extends Table
with SupportsMicroBatchRead with SupportsContinuousRead with SupportsStreamingWrite {
with SupportsRead with SupportsWrite {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't necessarily think we should go back to the design, but it is a bit weird in context to see that SupportsRead and SupportsWrite end up being super-capabilities configured differently than normal capabilities.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I raised the same concern before. @rdblue shall we reconsider it? I don't think we need the flexibility to change the read/write API(it will be a breaking change anyway). It's more important to make the API consistent with itself, and only has a single capability API.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think of these as super-capabilities, I think of them as a link between the v2 catalog API and the v2 source API. Using a trait like SupportsRead means we can later make changes to either one and continue to use the other. For example, if we added a Stream abstraction instead of using Table for both, then we would be able to have Stream implement SupportsRead. Does that make sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is, the Table interface is the link between v2 source API and v2 catalog API. If we want to change the abstraction later, I think that will be very painful, moving out the read/write methods to separated interfaces doesn't help much.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @jose-torres. The current mix is very confusing. Difficult to know what to use.

val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
val source = v1.get.dataSource.createSource(metadataPath)
nextSourceId += 1
logInfo(s"Using Source [$source] from DataSourceV2 named '$srcName' [$src]")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this wasn't introduced by the current PR, but this shouldn't say DataSourceV2.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we mention something about v1 and v2? Some sources have both v1 and v2 implementations and it might be helpful to have a log saying which implementation is actually used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be valuable to include whether v1 or v2 is used when the source supports both.

@SparkQA
Copy link

SparkQA commented Mar 18, 2019

Test build #103608 has finished for PR 24129 at commit b6363f2.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

@rdblue The check for the streaming side is done in MicroBatchExecution and ContinuousExecution, and the related tests are in StreamingDataSourceV2Suite.

I don't think it's an easy change to move the check to V2WriteSupportCheck, because

  1. streaming need to check the read side as well. A streaming source may only support micro-batch and can't be used with continuous mode.
  2. the check needs to know the current streaming mode, which is only available in MicroBatchExecution and ContinuousExecution.

If we do want to move the check, we should do it in another PR, which would introduce a non-trivial refactor.

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Mar 19, 2019

Test build #4636 has started for PR 24129 at commit b6363f2.

@SparkQA
Copy link

SparkQA commented Mar 19, 2019

Test build #103677 has started for PR 24129 at commit b6363f2.

* An internal base interface of mix-in interfaces for writable {@link Table}. This adds
* {@link #newWriteBuilder(CaseInsensitiveStringMap)} that is used to create a write
* for batch or streaming.
* A mix-in interface of {@link Table}, to indicate that it's readable.
Copy link
Contributor

@rdblue rdblue Mar 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jose-torres, @cloud-fan: Maybe the capability/trait distinction would be more clear if this stated "... to indicate that it's readable using the v2 datasource API".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have a plan to introduce something similar to the Table API, I'd like to be clear right now and say this is a mix-trait for Table.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fine with me. Can we make this change part of a separate documentation update?

*/
@Evolving
public interface Table {
public interface Table extends BaseStreamingSink {
Copy link
Contributor

@rdblue rdblue Mar 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did this add BaseStreamingSink? It doesn't make sense to me that we would require all tables to be streaming sinks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BaseStreamingSink is an empty internal interface, which is a common interface for v1 and v2 streaming sources. We will remove it after ds v2 is finalized and v1 streaming source can be removed.

If we don't want to pollute the Table interface, we can create a

public interface StreamingTable extends Table, BaseStreamingSink

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan, your reply is not a justification for adding this. Please answer: Why is this change required for this commit to function correctly?

Copy link
Contributor

@jose-torres jose-torres Mar 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's required because it's the common interface for streaming in V1 and V2. A streaming sink is represented by a Sink in V1 or a Table in V2; BaseStreamingSink is used in places where it doesn't matter which kind of sink it is. If this did not extend BaseStreamingSink the code wouldn't compile.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because Table is used to identify a source in MicroBatchExecution, to track the progress of all the sources in a micro-batch query.

This is only a problem in MicroBatchExecution, because it needs to support micro-bath source v1 (there is no continuous source v1) and ds v2 together. The way we did it is to create a common interface as the source representation for both micro-bath source v1 and ds v2, so that we can unify the code to track progress of sources, see https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't implementations extend BaseStreamingSink and leave this interface alone? Not all tables are streaming sinks, so this inheritance is incorrect.

import org.apache.spark.sql.sources.v2.Table;
import org.apache.spark.sql.sources.v2.TableCapability;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this commit includes non-functional changes. These docs changes are good to have, but they are not related to streaming and there is no need for this file to change in this commit. I would prefer to revert this and include it in a docs-only PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same with the changes in WriteBuilder.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's related to streaming: https://github.com/apache/spark/pull/24129/files#diff-d111d7e2179b55465840c9a81ea004f2R76

The only unnecessary change is the doc for batch. I replace BATCH_READ with a java doc link, to be consistent with streaming.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Javadoc changes are non-functional changes. If you want to correct Javadoc, limit updates to files that are already changing in the PR.

As it is, this is a non-functional change in an unrelated file. The commit message is going to indicate this commit is related to streaming, so this could easily cause conflicts. Please remove it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't agree with this. I add doc for streaming and change the doc for batch a little bit to match the streaming doc style. It's very annoying to separate minor change like this. I do consider this change as self-contained.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are too many documentation-only changes for this PR to be committed. Please separate them into a different PR. A docs-only commit will be easy to review and easy to cherry-pick based on the description.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did I miss something here? The docs changes are totally related to the current PR (the doc would refer to deleted interface).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docs changes that I was referring to were moved already, this was just a poorly placed comment.

@rdblue
Copy link
Contributor

rdblue commented Mar 19, 2019

@cloud-fan, are you saying that validation cannot be done by the analyzer?

@cloud-fan
Copy link
Contributor Author

@rdblue I don't have a good idea about how to do the streaming check in the analyzer. Currently we just transform the plan and do the table capability check in MicroBatchExecution and ContinuousExecution manually. It should be possible to move the logic to a rule, but that means micro-batch and continuous need different rules, as the checks for micro-batch and continuos are different.

@rdblue
Copy link
Contributor

rdblue commented Mar 20, 2019

@cloud-fan, could you be more clear and include the details? Why is this being done in the physical plan nodes in the first place?

@cloud-fan
Copy link
Contributor Author

Why is this being done in the physical plan nodes in the first place

How do you come to this conclusion? MicroBatchExecution and ContinuousExecution still deal with logical plans. I mean it's hard to do the check in a central place, as micro-batch and continuous have different checks.

One way I can think of is, let MicroBatchExecution and ContinuousExecution have different analyzer rules set so that we can add one rule in MicroBatchExecution and another rule in ContinuousExecution.

@cloud-fan
Copy link
Contributor Author

FYI this is the check for micro-batch. it's on logical plan.

@rdblue
Copy link
Contributor

rdblue commented Mar 21, 2019

Sorry for the confusion. From the names, like MicroBatchExecution, I thought that these were part of the physical plan. But that doesn't matter. The problem is that this is happening outside of the analyzer. Even if the check is supportsAny(MICROBATCH_READ, CONTINUOUS_READ), something reason able needs to be added to the analysis checks.

@rdblue
Copy link
Contributor

rdblue commented Mar 21, 2019

@cloud-fan, I don't think this commit is ready. I've left a few comments and responses. Please consider this a -1 until those are addressed.

@cloud-fan
Copy link
Contributor Author

@rdblue I've removed the BaseStreamingSink from the Table interface using your suggestion, please take another look.

It will be great to continue the discussion of #24129 (comment) , although it's not quite related to this PR.

@SparkQA
Copy link

SparkQA commented Mar 22, 2019

Test build #103834 has finished for PR 24129 at commit 2873d00.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Mar 23, 2019

Test build #103847 has finished for PR 24129 at commit e43d11f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

@rdblue any more comments? I'd like to merge it soon and work on the schema check capability.

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Apr 3, 2019

Test build #104225 has finished for PR 24129 at commit e43d11f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* A mix-in interface of {@link Table}, to indicate that it's readable.
* <p>
* This defines {@link #newScanBuilder(CaseInsensitiveStringMap)} that is used to create a scan
* builder for batch, micro-batch, or continuous processing.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this documentation change is necessary. Looks like it just reformats and rephrases a little.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason is, it's not an internal interface anymore. With table capability API, users need to implement this interface explicitly.

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Apr 17, 2019

Test build #104656 has finished for PR 24129 at commit 937a79b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 18, 2019

Test build #104684 has finished for PR 24129 at commit e022da0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Apr 18, 2019

Test build #104691 has finished for PR 24129 at commit e022da0.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gengliangwang
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Apr 18, 2019

Test build #104695 has finished for PR 24129 at commit e022da0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val allSupportsMicroBatch = streamingSources.forall(_.supports(MICRO_BATCH_READ))
// v1 streaming data source only supports micro-batch.
val allSupportsContinuous = streamingSources.forall(_.supports(CONTINUOUS_READ)) &&
v1StreamingRelations.nonEmpty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be isEmpty because v1 sources don't support continuous mode. If there is a v1 source and nonEmpty is true, then not all sources support continuous.

This would have been caught by a test. Can you add a few test cases for this like the V2WriteSupportCheckSuite?

v1StreamingRelations.nonEmpty
if (!allSupportsMicroBatch && !allSupportsContinuous) {
throw new AnalysisException(
"The streaming sources in a query do not have a common supported execution mode.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error message isn't very specific. Is it possible to show the table names that support each mode so that the user has enough feedback to know what went wrong? That would be more helpful.

object V2StreamingScanSupportCheck extends (LogicalPlan => Unit) {
import DataSourceV2Implicits._

override def apply(plan: LogicalPlan): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these checks should also include a validation for individual tables. If a table doesn't support streaming at all, a more helpful error message is that a specific table doesn't support streaming, not just that there isn't a streaming mode that can handle all of the sources.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we add this check here, it will never be hit because we already checked it earlier in DataStreamReader. Like I explained, it's non-trivial to move the check from DataStreamReader to here, because it's coupled with the v2 -> v1 fallback logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think eventually we will have an analyzer rule that checks streaming scan capability and fallback to v1 if necessary. This checker rule is not suitable because it just traverses the plan, not returning a new plan. So we can't implement the fallback logic in this rule.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if it will never be hit, it should still be in the analyzer so that plans that are created through other paths in the future are still checked. The idea is to avoid relying on a particular way of creating a logical plan to validate logical plans.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's why I opened https://issues.apache.org/jira/browse/SPARK-27483

plans that are created through other APIs(not DataStreamReader) still need the fallback logic, which can not be done within this checker rule.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would this check depend on fallback logic? These checks run after resolution rules have reached a fixed point. If there is a streaming DSv2 relation in the plan, fallback should already be done. Fallback logic is separate and this check can be done here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would this check depend on fallback logic?

It's the opposite. The fallback logic depends on the check. That said, the future analyzer rule would do 3 things:

  1. find the v2 streaming relation in the plan, check scan capability
  2. if check failed, fallback to v1 relation, and use the v1 relation to replace the v2 streaming relation.
  3. if fallback is not applicable, fail

You can see that, it's not a simple check anymore, which can not be done within this simpler checker rule (LogicalPlan => Unit).

Copy link
Contributor

@rdblue rdblue Apr 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan, let's get the validation in now.

I don't think that the fallback rule should be implemented as you describe. I think it should be done in 2 parts: the rule to fallback and update the plan, and this validation that all sources support streaming.

Spark should not combine transform rules and validations. There are a couple of reasons for this principle:

  1. Validations are used to ensure that the query is valid and to ensure that rules are run correctly. If the transform rule is added to the analyzer in a single-run batch, we want validation to catch that. These checks catch errors in Spark, too.
  2. Rules should be as small as possible and focused on a single task. The fallback rule should not fail analysis if it doesn't know what to do because some other rule may be added later that does. For example, what if we build an adapter from continuous execution to micro-batch execution for a source?

So we will need a validation rule either way. When the fallback rule runs and can't fix the problem, this check should be what fails the plan.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added.

case r: StreamingRelation => r
}

if ((streamingSources ++ v1StreamingRelations).length > 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Is ++ a cheap operation? There's no need to create a new seq here when this could check the length of both individually.

@SparkQA
Copy link

SparkQA commented Apr 19, 2019

Test build #104734 has finished for PR 24129 at commit 8eb3e4a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 19, 2019

Test build #104733 has finished for PR 24129 at commit 953b77d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 19, 2019

Test build #104741 has finished for PR 24129 at commit 30ca36c.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Apr 19, 2019

Test build #104745 has finished for PR 24129 at commit 30ca36c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

I've addressed all the comments, @rdblue do you have any more comments?

@cloud-fan
Copy link
Contributor Author

since @rdblue has no more comments , and @jose-torres has LGTMed, I'm merging to master, thanks!

@cloud-fan cloud-fan closed this in 85fd552 Apr 26, 2019
@rdblue
Copy link
Contributor

rdblue commented Apr 26, 2019

@cloud-fan, I was distracted this week by Spark Summit and writing talks.

My position on this PR was quite clear:

@cloud-fan, I don't think this commit is ready. I've left a few comments and responses. Please consider this a -1 until those are addressed.

I'd appreciate it if you would not commit changes with a clear -1 simply because you haven't heard from my in 3 days during a conference.

@rdblue
Copy link
Contributor

rdblue commented Apr 26, 2019

+1

My remaining concerns were addressed, but I think it was inappropriate to merge this without waiting for a review.

mccheah pushed a commit to palantir/spark that referenced this pull request May 24, 2019
## What changes were proposed in this pull request?

while working on apache#24129, I realized that I missed some document fixes in apache#24285. This PR covers all of them.

## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes apache#24295 from cloud-fan/doc.
mccheah pushed a commit to palantir/spark that referenced this pull request Jun 6, 2019
This is a followup of apache#24012 , to add the corresponding capabilities for streaming.

existing tests

Closes apache#24129 from cloud-fan/capability.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants